package com.codejiwei.flink.source;

import com.codejiwei.flink.entity.MarketingUserBehavior;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

/**
 * @author jiwei
 * @description
 * @date 2023/5/23 9:24
 */
public class AppMarketingDataSource extends RichSourceFunction<MarketingUserBehavior> {
    Random random = new Random();
    boolean runFlag = true;
    List<String> channels = Arrays.asList("huawei", "xiaomi", "apple", "baidu", "qq", "oppo", "vivo");
    List<String> behaviors = Arrays.asList("download", "install", "update", "uninstall");
    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");


    @Override
    public void run(SourceContext<MarketingUserBehavior> sourceContext) throws Exception {
        while (runFlag) {
            long currentTimeMillis = System.currentTimeMillis();
            MarketingUserBehavior marketingUserBehavior = new MarketingUserBehavior(
                    (long) random.nextInt(1000),
                    behaviors.get(random.nextInt(behaviors.size())),
                    sdf.format(currentTimeMillis),
                    channels.get(random.nextInt(channels.size())),
                    currentTimeMillis
            );
            sourceContext.collect(marketingUserBehavior);
            Thread.sleep(2000);
        }
    }

    @Override
    public void cancel() {
        runFlag = false;
    }
}
